package com.shujia.sql

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}

object Code15Plant {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .appName("sparkSession")
      .master("local")
      .config("spark.sql.shuffle.partitions", 2)
      .getOrCreate()

    import org.apache.spark.sql.functions._
    // 使用列表达式
    import spark.implicits._

    val user_low_carbonDf: DataFrame = spark
      .read
      .format("csv")
      .option("sep", ",")
      .schema("user_id String,date_dt String,low_carbon Int")
      .load("spark_code/data/user_low_carbon.txt")


    val plant_carbonDf: DataFrame = spark
      .read
      .format("csv")
      .option("sep", ",")
      .schema("plant_id String,plant_name String,plant_carbon Int")
      .load("spark_code/data/plant_carbon.txt")


    user_low_carbonDf.show()
    plant_carbonDf.show()

    val hy_plant_carbon: Int = plant_carbonDf
      .where($"plant_name" === "胡杨")
      .select($"plant_carbon")
      // head 可以获取一行数据将其转换成 Row对象
      .head()
      // 通过下标获取对应Row中的数据
      .getInt(0)

    //    println(hy_plant_carbon)


    val sl_plant_carbon: Int = plant_carbonDf
      .where($"plant_name" === "沙柳")
      .head()
      // 通过getAs取出一行中对应某列的数据
      .getAs[Int]("plant_carbon")


    // 需要对每个用户 计算炭的总量
    val user_low_carbon_formatDf: DataFrame = user_low_carbonDf
      //     先需要对数据过滤 1月1号到10月1号 的数据
      // 1.将日期转换成标准的格式
      .withColumn("format_date", from_unixtime(unix_timestamp($"date_dt", "yyyy/MM/dd"), "yyyy-MM-dd")).cache()

    user_low_carbon_formatDf
      // 2.过滤数据
      //      .where(expr("2017-01-01 <= format_date"))
      .where($"format_date" >= "2017-01-01" and $"format_date" <= "2017-10-01")
      // 3.对每个用户的碳进行求和
      .groupBy($"user_id")
      .agg(sum($"low_carbon") as "sum_low_carbon")
      // 4.满足申领条件的用户都申领了一颗p004-胡杨
      .where($"sum_low_carbon" >= hy_plant_carbon)
      .withColumn("sum_low_carbon_diff", $"sum_low_carbon" - hy_plant_carbon)
      // 5.剩余的能量全部用来领取“p002-沙柳”
      .withColumn("plant_count", floor($"sum_low_carbon_diff" / sl_plant_carbon))
      // 6. 求排名
      .withColumn("rn", row_number() over Window.orderBy($"plant_count".desc))
      .where($"rn" <= 11)
      // 7.对其求下一行 并拿当前行减去下一行
      .withColumn("less_count", lead("plant_count", 1, 0) over Window.orderBy($"plant_count".desc))
      .where($"rn" <= 10)
      .select($"user_id", $"plant_count", $"plant_count" - $"less_count" as "less_count")
      .show()


    //问题：查询user_low_carbon表中每日流水记录，条件为：
    //用户在2017年，连续三天（或以上）的天数里，

    user_low_carbon_formatDf
      // 1.统计每个用户每天的能量总和
      .groupBy($"user_id", $"format_date")
      .agg(sum($"low_carbon") as "sum_low_carbon_day")
      // 过滤大于100g
      .where($"sum_low_carbon_day" >= 100)
      // 排名
      .withColumn("rn", row_number() over Window.partitionBy($"user_id").orderBy("format_date"))
      // 对日期减去排名得到其标记日期
      .withColumn("flag_date", expr("date_sub(format_date,rn)"))
      //      .groupBy($"user_id",$"flag_date")
      //      .agg(count("*") as "num")
      // 由于后续需要去取出其中的format_date 所以需要使用窗口函数 对其进行统计个数
      .withColumn("num", count("*") over Window.partitionBy($"user_id", $"flag_date"))
      .where($"num" >= 3)
      // 关联源表中的数据 对其进行过滤取字段
      .join(user_low_carbon_formatDf, List("user_id", "format_date"))
      .show()
  }
}
